package gu.simplemq.jms;

import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import gu.simplemq.BaseMQDispatcher;
import gu.simplemq.MessageAck;
import gu.simplemq.SimplemqContext;
import gu.simplemq.exceptions.SmqRuntimeException;

import static com.google.common.base.Preconditions.checkNotNull;

/**
 * JMS消息分发器抽象实现(线程安全)<br>
 * @author guyadong
 *
 */
abstract class BaseJmsDispatcher extends BaseMQDispatcher<Connection> implements JMSReconnectCallback{
	private final Hashtable<String, ConsumerContext> consumers = new Hashtable<>();
	private final AutoReconnectAdapter autoReconnectAdapter = new AutoReconnectAdapter(this);
	private Connection connection;
	/**
	 * 创建消息接收处理对象
	 * @param session 
	 * @param name name of queue or topic
	 * @param session
	 * @return 消息消费对象
	 * @throws JMSException
	 */
	protected abstract Destination makeDestination(Session session, String name) throws JMSException;
	BaseJmsDispatcher(JmsPoolLazy pool) {
		super(pool);
	}
	
	@Override
	protected void doInit() throws JMSException{
		connection = getConnection();
		/** 成功获取连接后设置异常侦听器 */
		new ExceptionListenerContainer(autoReconnectAdapter).bind(connection);
		connection.start();
	}
	@Override
	protected void doUninit() {
	}
	@Override
	protected void doSub(String channel) throws JMSException {
		synchronized (consumers) {
			ConsumerContext ctx = consumers.get(channel);
			if(null == ctx){
				ctx = new ConsumerContext();
				boolean autoack = getChannel(channel).isAutoack();
				ctx.session = checkNotNull(connection,"connection is uninitialized")
						.createSession(Boolean.FALSE, autoack ? Session.AUTO_ACKNOWLEDGE: Session.CLIENT_ACKNOWLEDGE);
				Destination destination = makeDestination(ctx.session, channel);
				ctx.consumer = ctx.session.createConsumer(destination);
				ctx.consumer.setMessageListener(new ActivemqListener(channel, autoack));
				consumers.put(channel, ctx);
			}
		}
	}

	@Override
	protected void doUnsub(String channel) throws JMSException {
		synchronized (consumers) {
			ConsumerContext ctx = consumers.remove(channel);
			if(null != ctx){
				ctx.consumer.close();
			}
		}
	}
	@Override
	public void onConnectionLost() throws Exception {
		synchronized (consumers) {
			for(ConsumerContext ctx : consumers.values()) {
				try {
					ctx.consumer.close();
				} catch (Exception e) {
					logger.error(e.getMessage());
				}
			}
			consumers.clear();
		}
		uninit();
	}
	
	@Override
	public void tryReconnecting() throws Exception {
		init();
		/** 重新初始化并订阅所有频道 */
		subscribe();
	}
	@Override
	public String ownerName() {
		return getClass().getSimpleName();
	}
	private class ActivemqListener implements MessageListener{
		private final String channel;
		private final boolean autoack;
		ActivemqListener(String channel, boolean autoack) {
			this.channel = channel;
			this.autoack = autoack;
		}
		
		private String textOf(Message message) throws JMSException{
			if(message instanceof TextMessage){
				return ((TextMessage) message).getText();
			}
			if(message instanceof BytesMessage){
				BytesMessage bytesMessage = (BytesMessage)message;
				byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
				bytesMessage.readBytes(buf);
				return new String(buf,UTF_8);
			}
			throw new IllegalArgumentException(String.format("INVALID message type,%s,%s required",
					TextMessage.class.getName(),
					BytesMessage.class.getName()));
		}
		@Override
		public void onMessage(Message message) {
			try {
				if(!autoack) {
					SimplemqContext.getCurrentContext().setMessageAck(new JMSMessageAck(message));
				}
				// 调用 IMessageDispatcher 接口分发消息
				dispatch(channel, textOf(message));
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
		
	}
	/**
	 * JMS 消息确认实现
	 * @author guyadong
	 * @since 2.3.9
	 */
	private static class JMSMessageAck implements MessageAck{
		private final Message message;
		
		private JMSMessageAck(Message message) {
			this.message = checkNotNull(message,"message is null");
		}

		@Override
		public void acknowledge() throws SmqRuntimeException {
			try {
				message.acknowledge();
			} catch (JMSException e) {
				throw new SmqRuntimeException(e); 
			}
		}
		
	}
	class ConsumerContext {
		MessageConsumer consumer;
		Session session;
		ConsumerContext() {
		}
	}
}
